From 995c2dc4d90044886fad5915ac711e503a1c2a7a Mon Sep 17 00:00:00 2001 From: Darius Date: Sun, 16 Jul 2023 16:41:23 -0400 Subject: [PATCH] refactor: Migrate to quick-protobuf --- Cargo.lock | 2 +- Cargo.toml | 5 +- build.rs | 4 - src/compat/message.rs | 56 ++--- src/compat/mod.rs | 1 + src/compat/{ => pb}/bitswap_pb.proto | 0 src/compat/pb/bitswap_pb.rs | 298 +++++++++++++++++++++++++++ src/compat/pb/mod.rs | 2 + 8 files changed, 338 insertions(+), 30 deletions(-) delete mode 100644 build.rs rename src/compat/{ => pb}/bitswap_pb.proto (100%) create mode 100644 src/compat/pb/bitswap_pb.rs create mode 100644 src/compat/pb/mod.rs diff --git a/Cargo.lock b/Cargo.lock index c80089f..9a63994 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1183,8 +1183,8 @@ dependencies = [ "libp2p", "multihash 0.17.0", "prometheus", - "prost", "prost-build", + "quick-protobuf", "thiserror", "tracing", "tracing-subscriber", diff --git a/Cargo.toml b/Cargo.toml index 8ebba9c..6f4dded 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ license = "MIT OR Apache-2.0" repository = "https://github.com/ipfs-rust/libp2p-bitswap" [features] -compat = ["prost", "prost-build"] +compat = ["quick-protobuf"] [build-dependencies] prost-build = { version = "0.11", optional = true } @@ -21,11 +21,12 @@ lazy_static = "1.4.0" libipld = { version = "0.15.0", default-features = false } libp2p = { version = "0.52", features = ["request-response"] } prometheus = "0.13.0" -prost = { version = "0.11", optional = true } thiserror = "1.0.30" tracing = "0.1.29" unsigned-varint = { version = "0.7.1", features = ["futures", "std"] } +quick-protobuf = { version = "0.8.1", optional = true } + [dev-dependencies] async-std = { version = "1.10.0", features = ["attributes"] } env_logger = "0.9.0" diff --git a/build.rs b/build.rs deleted file mode 100644 index 6b7fa7b..0000000 --- a/build.rs +++ /dev/null @@ -1,4 +0,0 @@ -fn main() { - #[cfg(feature = "compat")] - prost_build::compile_protos(&["src/compat/bitswap_pb.proto"], &["src/compat"]).unwrap(); -} diff --git a/src/compat/message.rs b/src/compat/message.rs index 0c3f1f9..4b183f5 100644 --- a/src/compat/message.rs +++ b/src/compat/message.rs @@ -2,12 +2,18 @@ use crate::compat::other; use crate::compat::prefix::Prefix; use crate::protocol::{BitswapRequest, BitswapResponse, RequestType}; use libipld::Cid; -use prost::Message; +use quick_protobuf::{BytesReader, MessageRead, MessageWrite, Writer}; use std::convert::TryFrom; use std::io; mod bitswap_pb { - include!(concat!(env!("OUT_DIR"), "/bitswap_pb.rs")); + pub use super::super::pb::bitswap_pb::Message; + pub mod message { + use super::super::super::pb::bitswap_pb::mod_Message as message; + pub use message::mod_Wantlist as wantlist; + pub use message::Wantlist; + pub use message::{Block, BlockPresence, BlockPresenceType}; + } } #[derive(Clone, Debug, Eq, PartialEq)] @@ -23,12 +29,12 @@ impl CompatMessage { CompatMessage::Request(BitswapRequest { ty, cid }) => { let mut wantlist = bitswap_pb::message::Wantlist::default(); let entry = bitswap_pb::message::wantlist::Entry { - block: cid.to_bytes(), - want_type: match ty { + block: cid.to_bytes().into(), + wantType: match ty { RequestType::Have => bitswap_pb::message::wantlist::WantType::Have, RequestType::Block => bitswap_pb::message::wantlist::WantType::Block, } as _, - send_dont_have: true, + sendDontHave: true, cancel: false, priority: 1, }; @@ -37,42 +43,46 @@ impl CompatMessage { } CompatMessage::Response(cid, BitswapResponse::Have(have)) => { let block_presence = bitswap_pb::message::BlockPresence { - cid: cid.to_bytes(), - r#type: if *have { + cid: cid.to_bytes().into(), + type_pb: if *have { bitswap_pb::message::BlockPresenceType::Have } else { bitswap_pb::message::BlockPresenceType::DontHave } as _, }; - msg.block_presences.push(block_presence); + msg.blockPresences.push(block_presence); } CompatMessage::Response(cid, BitswapResponse::Block(bytes)) => { let payload = bitswap_pb::message::Block { - prefix: Prefix::from(cid).to_bytes(), - data: bytes.to_vec(), + prefix: Prefix::from(cid).to_bytes().into(), + data: bytes.into(), }; msg.payload.push(payload); } } - let mut bytes = Vec::with_capacity(msg.encoded_len()); - msg.encode(&mut bytes).map_err(other)?; + + let mut bytes = Vec::with_capacity(msg.get_size()); + let mut writer = Writer::new(&mut bytes); + msg.write_message(&mut writer).map_err(other)?; Ok(bytes) } pub fn from_bytes(bytes: &[u8]) -> io::Result> { - let msg = bitswap_pb::Message::decode(bytes)?; + let mut reader = BytesReader::from_bytes(bytes); + let msg = bitswap_pb::Message::from_reader(&mut reader, bytes).map_err(other)?; + let mut parts = vec![]; for entry in msg.wantlist.unwrap_or_default().entries { - if !entry.send_dont_have { + if !entry.sendDontHave { tracing::error!("message hasn't set `send_dont_have`: skipping"); continue; } - let cid = Cid::try_from(entry.block).map_err(other)?; - let ty = match entry.want_type { - ty if bitswap_pb::message::wantlist::WantType::Have as i32 == ty => { + let cid = Cid::try_from(&*entry.block).map_err(other)?; + let ty = match entry.wantType { + ty if bitswap_pb::message::wantlist::WantType::Have == ty => { RequestType::Have } - ty if bitswap_pb::message::wantlist::WantType::Block as i32 == ty => { + ty if bitswap_pb::message::wantlist::WantType::Block == ty => { RequestType::Block } _ => { @@ -90,11 +100,11 @@ impl CompatMessage { BitswapResponse::Block(payload.data.to_vec()), )); } - for presence in msg.block_presences { - let cid = Cid::try_from(presence.cid).map_err(other)?; - let have = match presence.r#type { - ty if bitswap_pb::message::BlockPresenceType::Have as i32 == ty => true, - ty if bitswap_pb::message::BlockPresenceType::DontHave as i32 == ty => false, + for presence in msg.blockPresences { + let cid = Cid::try_from(&*presence.cid).map_err(other)?; + let have = match presence.type_pb { + ty if bitswap_pb::message::BlockPresenceType::Have == ty => true, + ty if bitswap_pb::message::BlockPresenceType::DontHave == ty => false, _ => { tracing::error!("invalid block presence type: skipping"); continue; diff --git a/src/compat/mod.rs b/src/compat/mod.rs index b3400c2..4d591ce 100644 --- a/src/compat/mod.rs +++ b/src/compat/mod.rs @@ -1,6 +1,7 @@ mod message; mod prefix; mod protocol; +mod pb; pub use message::CompatMessage; pub use protocol::{CompatProtocol, InboundMessage}; diff --git a/src/compat/bitswap_pb.proto b/src/compat/pb/bitswap_pb.proto similarity index 100% rename from src/compat/bitswap_pb.proto rename to src/compat/pb/bitswap_pb.proto diff --git a/src/compat/pb/bitswap_pb.rs b/src/compat/pb/bitswap_pb.rs new file mode 100644 index 0000000..2f1784c --- /dev/null +++ b/src/compat/pb/bitswap_pb.rs @@ -0,0 +1,298 @@ +// Automatically generated rust module for 'bitswap_pb.proto' file + +#![allow(non_snake_case)] +#![allow(non_upper_case_globals)] +#![allow(non_camel_case_types)] +#![allow(unused_imports)] +#![allow(unknown_lints)] +#![allow(clippy::all)] +#![cfg_attr(rustfmt, rustfmt_skip)] + + +use std::borrow::Cow; +use quick_protobuf::{MessageInfo, MessageRead, MessageWrite, BytesReader, Writer, WriterBackend, Result}; +use quick_protobuf::sizeofs::*; +use super::*; + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct Message<'a> { + pub wantlist: Option>, + pub blocks: Vec>, + pub payload: Vec>, + pub blockPresences: Vec>, + pub pendingBytes: i32, +} + +impl<'a> MessageRead<'a> for Message<'a> { + fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { + let mut msg = Self::default(); + while !r.is_eof() { + match r.next_tag(bytes) { + Ok(10) => msg.wantlist = Some(r.read_message::(bytes)?), + Ok(18) => msg.blocks.push(r.read_bytes(bytes).map(Cow::Borrowed)?), + Ok(26) => msg.payload.push(r.read_message::(bytes)?), + Ok(34) => msg.blockPresences.push(r.read_message::(bytes)?), + Ok(40) => msg.pendingBytes = r.read_int32(bytes)?, + Ok(t) => { r.read_unknown(bytes, t)?; } + Err(e) => return Err(e), + } + } + Ok(msg) + } +} + +impl<'a> MessageWrite for Message<'a> { + fn get_size(&self) -> usize { + 0 + + self.wantlist.as_ref().map_or(0, |m| 1 + sizeof_len((m).get_size())) + + self.blocks.iter().map(|s| 1 + sizeof_len((s).len())).sum::() + + self.payload.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::() + + self.blockPresences.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::() + + if self.pendingBytes == 0i32 { 0 } else { 1 + sizeof_varint(*(&self.pendingBytes) as u64) } + } + + fn write_message(&self, w: &mut Writer) -> Result<()> { + if let Some(ref s) = self.wantlist { w.write_with_tag(10, |w| w.write_message(s))?; } + for s in &self.blocks { w.write_with_tag(18, |w| w.write_bytes(&**s))?; } + for s in &self.payload { w.write_with_tag(26, |w| w.write_message(s))?; } + for s in &self.blockPresences { w.write_with_tag(34, |w| w.write_message(s))?; } + if self.pendingBytes != 0i32 { w.write_with_tag(40, |w| w.write_int32(*&self.pendingBytes))?; } + Ok(()) + } +} + +pub mod mod_Message { + +use std::borrow::Cow; +use super::*; + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct Wantlist<'a> { + pub entries: Vec>, + pub full: bool, +} + +impl<'a> MessageRead<'a> for Wantlist<'a> { + fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { + let mut msg = Self::default(); + while !r.is_eof() { + match r.next_tag(bytes) { + Ok(10) => msg.entries.push(r.read_message::(bytes)?), + Ok(16) => msg.full = r.read_bool(bytes)?, + Ok(t) => { r.read_unknown(bytes, t)?; } + Err(e) => return Err(e), + } + } + Ok(msg) + } +} + +impl<'a> MessageWrite for Wantlist<'a> { + fn get_size(&self) -> usize { + 0 + + self.entries.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::() + + if self.full == false { 0 } else { 1 + sizeof_varint(*(&self.full) as u64) } + } + + fn write_message(&self, w: &mut Writer) -> Result<()> { + for s in &self.entries { w.write_with_tag(10, |w| w.write_message(s))?; } + if self.full != false { w.write_with_tag(16, |w| w.write_bool(*&self.full))?; } + Ok(()) + } +} + +pub mod mod_Wantlist { + +use std::borrow::Cow; +use super::*; + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct Entry<'a> { + pub block: Cow<'a, [u8]>, + pub priority: i32, + pub cancel: bool, + pub wantType: bitswap_pb::mod_Message::mod_Wantlist::WantType, + pub sendDontHave: bool, +} + +impl<'a> MessageRead<'a> for Entry<'a> { + fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { + let mut msg = Self::default(); + while !r.is_eof() { + match r.next_tag(bytes) { + Ok(10) => msg.block = r.read_bytes(bytes).map(Cow::Borrowed)?, + Ok(16) => msg.priority = r.read_int32(bytes)?, + Ok(24) => msg.cancel = r.read_bool(bytes)?, + Ok(32) => msg.wantType = r.read_enum(bytes)?, + Ok(40) => msg.sendDontHave = r.read_bool(bytes)?, + Ok(t) => { r.read_unknown(bytes, t)?; } + Err(e) => return Err(e), + } + } + Ok(msg) + } +} + +impl<'a> MessageWrite for Entry<'a> { + fn get_size(&self) -> usize { + 0 + + if self.block == Cow::Borrowed(b"") { 0 } else { 1 + sizeof_len((&self.block).len()) } + + if self.priority == 0i32 { 0 } else { 1 + sizeof_varint(*(&self.priority) as u64) } + + if self.cancel == false { 0 } else { 1 + sizeof_varint(*(&self.cancel) as u64) } + + if self.wantType == bitswap_pb::mod_Message::mod_Wantlist::WantType::Block { 0 } else { 1 + sizeof_varint(*(&self.wantType) as u64) } + + if self.sendDontHave == false { 0 } else { 1 + sizeof_varint(*(&self.sendDontHave) as u64) } + } + + fn write_message(&self, w: &mut Writer) -> Result<()> { + if self.block != Cow::Borrowed(b"") { w.write_with_tag(10, |w| w.write_bytes(&**&self.block))?; } + if self.priority != 0i32 { w.write_with_tag(16, |w| w.write_int32(*&self.priority))?; } + if self.cancel != false { w.write_with_tag(24, |w| w.write_bool(*&self.cancel))?; } + if self.wantType != bitswap_pb::mod_Message::mod_Wantlist::WantType::Block { w.write_with_tag(32, |w| w.write_enum(*&self.wantType as i32))?; } + if self.sendDontHave != false { w.write_with_tag(40, |w| w.write_bool(*&self.sendDontHave))?; } + Ok(()) + } +} + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum WantType { + Block = 0, + Have = 1, +} + +impl Default for WantType { + fn default() -> Self { + WantType::Block + } +} + +impl From for WantType { + fn from(i: i32) -> Self { + match i { + 0 => WantType::Block, + 1 => WantType::Have, + _ => Self::default(), + } + } +} + +impl<'a> From<&'a str> for WantType { + fn from(s: &'a str) -> Self { + match s { + "Block" => WantType::Block, + "Have" => WantType::Have, + _ => Self::default(), + } + } +} + +} + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct Block<'a> { + pub prefix: Cow<'a, [u8]>, + pub data: Cow<'a, [u8]>, +} + +impl<'a> MessageRead<'a> for Block<'a> { + fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { + let mut msg = Self::default(); + while !r.is_eof() { + match r.next_tag(bytes) { + Ok(10) => msg.prefix = r.read_bytes(bytes).map(Cow::Borrowed)?, + Ok(18) => msg.data = r.read_bytes(bytes).map(Cow::Borrowed)?, + Ok(t) => { r.read_unknown(bytes, t)?; } + Err(e) => return Err(e), + } + } + Ok(msg) + } +} + +impl<'a> MessageWrite for Block<'a> { + fn get_size(&self) -> usize { + 0 + + if self.prefix == Cow::Borrowed(b"") { 0 } else { 1 + sizeof_len((&self.prefix).len()) } + + if self.data == Cow::Borrowed(b"") { 0 } else { 1 + sizeof_len((&self.data).len()) } + } + + fn write_message(&self, w: &mut Writer) -> Result<()> { + if self.prefix != Cow::Borrowed(b"") { w.write_with_tag(10, |w| w.write_bytes(&**&self.prefix))?; } + if self.data != Cow::Borrowed(b"") { w.write_with_tag(18, |w| w.write_bytes(&**&self.data))?; } + Ok(()) + } +} + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct BlockPresence<'a> { + pub cid: Cow<'a, [u8]>, + pub type_pb: bitswap_pb::mod_Message::BlockPresenceType, +} + +impl<'a> MessageRead<'a> for BlockPresence<'a> { + fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { + let mut msg = Self::default(); + while !r.is_eof() { + match r.next_tag(bytes) { + Ok(10) => msg.cid = r.read_bytes(bytes).map(Cow::Borrowed)?, + Ok(16) => msg.type_pb = r.read_enum(bytes)?, + Ok(t) => { r.read_unknown(bytes, t)?; } + Err(e) => return Err(e), + } + } + Ok(msg) + } +} + +impl<'a> MessageWrite for BlockPresence<'a> { + fn get_size(&self) -> usize { + 0 + + if self.cid == Cow::Borrowed(b"") { 0 } else { 1 + sizeof_len((&self.cid).len()) } + + if self.type_pb == bitswap_pb::mod_Message::BlockPresenceType::Have { 0 } else { 1 + sizeof_varint(*(&self.type_pb) as u64) } + } + + fn write_message(&self, w: &mut Writer) -> Result<()> { + if self.cid != Cow::Borrowed(b"") { w.write_with_tag(10, |w| w.write_bytes(&**&self.cid))?; } + if self.type_pb != bitswap_pb::mod_Message::BlockPresenceType::Have { w.write_with_tag(16, |w| w.write_enum(*&self.type_pb as i32))?; } + Ok(()) + } +} + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum BlockPresenceType { + Have = 0, + DontHave = 1, +} + +impl Default for BlockPresenceType { + fn default() -> Self { + BlockPresenceType::Have + } +} + +impl From for BlockPresenceType { + fn from(i: i32) -> Self { + match i { + 0 => BlockPresenceType::Have, + 1 => BlockPresenceType::DontHave, + _ => Self::default(), + } + } +} + +impl<'a> From<&'a str> for BlockPresenceType { + fn from(s: &'a str) -> Self { + match s { + "Have" => BlockPresenceType::Have, + "DontHave" => BlockPresenceType::DontHave, + _ => Self::default(), + } + } +} + +} + diff --git a/src/compat/pb/mod.rs b/src/compat/pb/mod.rs new file mode 100644 index 0000000..833514c --- /dev/null +++ b/src/compat/pb/mod.rs @@ -0,0 +1,2 @@ +// Automatically generated mod.rs +pub mod bitswap_pb;