Skip to content

Commit 4adb294

Browse files
authored
Merge pull request #109 from nicklan/new-data-passing-api
Switch to new data passing API
2 parents 5f48dea + 14d0aff commit 4adb294

27 files changed

+2262
-1345
lines changed

kernel/Cargo.toml

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,34 +39,41 @@ arrow-ord = { version = "^49.0", optional = true }
3939
arrow-schema = { version = "^49.0", optional = true }
4040
futures = { version = "0.3", optional = true }
4141
object_store = { version = "^0.8.0", optional = true }
42-
parquet = { version = "^49.0", optional = true, features = [
43-
"async",
44-
"object_store",
45-
] }
42+
# Used in default and simple client
43+
parquet = { version = "^49.0", optional = true }
4644

4745
# optionally used with default client (though not required)
4846
tokio = { version = "1", optional = true, features = ["rt-multi-thread"] }
4947

5048
[features]
51-
default = ["default-client"]
49+
arrow-conversion = ["arrow-schema"]
50+
default = ["simple-client"]
5251
default-client = [
52+
"arrow-conversion",
5353
"arrow-arith",
5454
"arrow-json",
5555
"arrow-ord",
5656
"arrow-schema",
5757
"futures",
5858
"object_store",
59-
"parquet",
59+
"parquet/async",
60+
"parquet/object_store",
61+
"tokio"
6062
]
63+
6164
developer-visibility = []
65+
simple-client = [
66+
"arrow-conversion",
67+
"arrow-json",
68+
"parquet"
69+
]
6270

6371
[dev-dependencies]
6472
arrow = { version = "^49.0", features = ["json", "prettyprint"] }
65-
deltakernel = { path = ".", features = ["tokio"] }
73+
deltakernel = { path = ".", features = ["default-client"] }
6674
test-log = { version = "0.2", default-features = false, features = ["trace"] }
6775
tempfile = "3"
6876
test-case = { version = "3.1.0" }
69-
tokio = { version = "1" }
7077
tracing-subscriber = { version = "0.3", default-features = false, features = [
7178
"env-filter",
7279
"fmt",

kernel/src/actions/deletion_vector.rs

Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
//! Code relating to parsing and using deletion vectors
2+
3+
use std::io::{Cursor, Read};
4+
use std::sync::Arc;
5+
6+
use roaring::RoaringTreemap;
7+
use url::Url;
8+
9+
use crate::{DeltaResult, Error, FileSystemClient};
10+
11+
#[derive(Debug, Clone, PartialEq, Eq)]
12+
pub struct DeletionVectorDescriptor {
13+
/// A single character to indicate how to access the DV. Legal options are: ['u', 'i', 'p'].
14+
pub storage_type: String,
15+
16+
/// Three format options are currently proposed:
17+
/// - If `storageType = 'u'` then `<random prefix - optional><base85 encoded uuid>`:
18+
/// The deletion vector is stored in a file with a path relative to the data
19+
/// directory of this Delta table, and the file name can be reconstructed from
20+
/// the UUID. See Derived Fields for how to reconstruct the file name. The random
21+
/// prefix is recovered as the extra characters before the (20 characters fixed length) uuid.
22+
/// - If `storageType = 'i'` then `<base85 encoded bytes>`: The deletion vector
23+
/// is stored inline in the log. The format used is the `RoaringBitmapArray`
24+
/// format also used when the DV is stored on disk and described in [Deletion Vector Format].
25+
/// - If `storageType = 'p'` then `<absolute path>`: The DV is stored in a file with an
26+
/// absolute path given by this path, which has the same format as the `path` field
27+
/// in the `add`/`remove` actions.
28+
///
29+
/// [Deletion Vector Format]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#Deletion-Vector-Format
30+
pub path_or_inline_dv: String,
31+
32+
/// Start of the data for this DV in number of bytes from the beginning of the file it is stored in.
33+
/// Always None (absent in JSON) when `storageType = 'i'`.
34+
pub offset: Option<i32>,
35+
36+
/// Size of the serialized DV in bytes (raw data size, i.e. before base85 encoding, if inline).
37+
pub size_in_bytes: i32,
38+
39+
/// Number of rows the given DV logically removes from the file.
40+
pub cardinality: i64,
41+
}
42+
43+
impl DeletionVectorDescriptor {
44+
pub fn unique_id(&self) -> String {
45+
if let Some(offset) = self.offset {
46+
format!("{}{}@{offset}", self.storage_type, self.path_or_inline_dv)
47+
} else {
48+
format!("{}{}", self.storage_type, self.path_or_inline_dv)
49+
}
50+
}
51+
52+
pub fn absolute_path(&self, parent: &Url) -> DeltaResult<Option<Url>> {
53+
match self.storage_type.as_str() {
54+
"u" => {
55+
let prefix_len = self.path_or_inline_dv.len() as i32 - 20;
56+
if prefix_len < 0 {
57+
return Err(Error::deletion_vector("Invalid length"));
58+
}
59+
let decoded = z85::decode(&self.path_or_inline_dv[(prefix_len as usize)..])
60+
.map_err(|_| Error::deletion_vector("Failed to decode DV uuid"))?;
61+
let uuid = uuid::Uuid::from_slice(&decoded)
62+
.map_err(|err| Error::DeletionVector(err.to_string()))?;
63+
let dv_suffix = if prefix_len > 0 {
64+
format!(
65+
"{}/deletion_vector_{uuid}.bin",
66+
&self.path_or_inline_dv[..(prefix_len as usize)]
67+
)
68+
} else {
69+
format!("deletion_vector_{uuid}.bin")
70+
};
71+
let dv_path = parent
72+
.join(&dv_suffix)
73+
.map_err(|_| Error::DeletionVector(format!("invalid path: {dv_suffix}")))?;
74+
Ok(Some(dv_path))
75+
}
76+
"p" => Ok(Some(Url::parse(&self.path_or_inline_dv).map_err(|_| {
77+
Error::DeletionVector(format!("invalid path: {}", self.path_or_inline_dv))
78+
})?)),
79+
"i" => Ok(None),
80+
other => Err(Error::DeletionVector(format!(
81+
"Unknown storage format: '{other}'."
82+
))),
83+
}
84+
}
85+
86+
pub fn read(
87+
&self,
88+
fs_client: Arc<dyn FileSystemClient>,
89+
parent: Url,
90+
) -> DeltaResult<RoaringTreemap> {
91+
match self.absolute_path(&parent)? {
92+
None => {
93+
let bytes = z85::decode(&self.path_or_inline_dv)
94+
.map_err(|_| Error::deletion_vector("Failed to decode DV"))?;
95+
RoaringTreemap::deserialize_from(&bytes[12..])
96+
.map_err(|err| Error::DeletionVector(err.to_string()))
97+
}
98+
Some(path) => {
99+
let offset = self.offset;
100+
let size_in_bytes = self.size_in_bytes;
101+
102+
let dv_data = fs_client
103+
.read_files(vec![(path, None)])?
104+
.next()
105+
.ok_or(Error::missing_data("No deletion vector data"))??;
106+
107+
let mut cursor = Cursor::new(dv_data);
108+
if let Some(offset) = offset {
109+
// TODO should we read the datasize from the DV file?
110+
// offset plus datasize bytes
111+
cursor.set_position((offset + 4) as u64);
112+
}
113+
114+
let mut buf = vec![0; 4];
115+
cursor
116+
.read(&mut buf)
117+
.map_err(|err| Error::DeletionVector(err.to_string()))?;
118+
let magic = i32::from_le_bytes(
119+
buf.try_into()
120+
.map_err(|_| Error::deletion_vector("failed to read magic bytes"))?,
121+
);
122+
if magic != 1681511377 {
123+
return Err(Error::DeletionVector(format!("Invalid magic {magic}")));
124+
}
125+
126+
let mut buf = vec![0; size_in_bytes as usize];
127+
cursor
128+
.read(&mut buf)
129+
.map_err(|err| Error::DeletionVector(err.to_string()))?;
130+
131+
RoaringTreemap::deserialize_from(Cursor::new(buf))
132+
.map_err(|err| Error::DeletionVector(err.to_string()))
133+
}
134+
}
135+
}
136+
}
137+
138+
pub(crate) fn treemap_to_bools(treemap: RoaringTreemap) -> Vec<bool> {
139+
fn combine(high_bits: u32, low_bits: u32) -> usize {
140+
((u64::from(high_bits) << 32) | u64::from(low_bits)) as usize
141+
}
142+
143+
match treemap.max() {
144+
Some(max) => {
145+
// there are values in the map
146+
//TODO(nick) panic if max is > MAX_USIZE
147+
let mut result = vec![true; max as usize + 1];
148+
let bitmaps = treemap.bitmaps();
149+
for (index, bitmap) in bitmaps {
150+
for bit in bitmap.iter() {
151+
let vec_index = combine(index, bit);
152+
result[vec_index] = false;
153+
}
154+
}
155+
result
156+
}
157+
None => {
158+
// empty set, return empty vec
159+
vec![]
160+
}
161+
}
162+
}
163+
164+
#[cfg(test)]
165+
mod tests {
166+
use roaring::RoaringTreemap;
167+
use std::path::PathBuf;
168+
169+
use super::*;
170+
use crate::{simple_client::SimpleClient, EngineInterface};
171+
172+
use super::DeletionVectorDescriptor;
173+
174+
fn dv_relative() -> DeletionVectorDescriptor {
175+
DeletionVectorDescriptor {
176+
storage_type: "u".to_string(),
177+
path_or_inline_dv: "ab^-aqEH.-t@S}K{vb[*k^".to_string(),
178+
offset: Some(4),
179+
size_in_bytes: 40,
180+
cardinality: 6,
181+
}
182+
}
183+
184+
fn dv_absolute() -> DeletionVectorDescriptor {
185+
DeletionVectorDescriptor {
186+
storage_type: "p".to_string(),
187+
path_or_inline_dv:
188+
"s3://mytable/deletion_vector_d2c639aa-8816-431a-aaf6-d3fe2512ff61.bin".to_string(),
189+
offset: Some(4),
190+
size_in_bytes: 40,
191+
cardinality: 6,
192+
}
193+
}
194+
195+
fn dv_inline() -> DeletionVectorDescriptor {
196+
DeletionVectorDescriptor {
197+
storage_type: "i".to_string(),
198+
path_or_inline_dv: "wi5b=000010000siXQKl0rr91000f55c8Xg0@@D72lkbi5=-{L".to_string(),
199+
offset: None,
200+
size_in_bytes: 40,
201+
cardinality: 6,
202+
}
203+
}
204+
205+
fn dv_example() -> DeletionVectorDescriptor {
206+
DeletionVectorDescriptor {
207+
storage_type: "u".to_string(),
208+
path_or_inline_dv: "vBn[lx{q8@P<9BNH/isA".to_string(),
209+
offset: Some(1),
210+
size_in_bytes: 36,
211+
cardinality: 2,
212+
}
213+
}
214+
215+
#[test]
216+
fn test_deletion_vector_absolute_path() {
217+
let parent = Url::parse("s3://mytable/").unwrap();
218+
219+
let relative = dv_relative();
220+
let expected =
221+
Url::parse("s3://mytable/ab/deletion_vector_d2c639aa-8816-431a-aaf6-d3fe2512ff61.bin")
222+
.unwrap();
223+
assert_eq!(expected, relative.absolute_path(&parent).unwrap().unwrap());
224+
225+
let absolute = dv_absolute();
226+
let expected =
227+
Url::parse("s3://mytable/deletion_vector_d2c639aa-8816-431a-aaf6-d3fe2512ff61.bin")
228+
.unwrap();
229+
assert_eq!(expected, absolute.absolute_path(&parent).unwrap().unwrap());
230+
231+
let inline = dv_inline();
232+
assert_eq!(None, inline.absolute_path(&parent).unwrap());
233+
234+
let path =
235+
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
236+
let parent = url::Url::from_directory_path(path).unwrap();
237+
let dv_url = parent
238+
.join("deletion_vector_61d16c75-6994-46b7-a15b-8b538852e50e.bin")
239+
.unwrap();
240+
let example = dv_example();
241+
assert_eq!(dv_url, example.absolute_path(&parent).unwrap().unwrap());
242+
}
243+
244+
#[test]
245+
fn test_deletion_vector_read() {
246+
let path =
247+
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
248+
let parent = url::Url::from_directory_path(path).unwrap();
249+
let simple_client = SimpleClient::new();
250+
let fs_client = simple_client.get_file_system_client();
251+
252+
let example = dv_example();
253+
let tree_map = example.read(fs_client, parent).unwrap();
254+
255+
let expected: Vec<u64> = vec![0, 9];
256+
let found = tree_map.iter().collect::<Vec<_>>();
257+
assert_eq!(found, expected)
258+
}
259+
260+
// this test is ignored by default as it's expensive to allocate such big vecs full of `true`. you can run it via:
261+
// cargo test actions::action_definitions::tests::test_dv_to_bools
262+
#[test]
263+
#[ignore]
264+
fn test_dv_to_bools() {
265+
let mut rb = RoaringTreemap::new();
266+
rb.insert(0);
267+
rb.insert(2);
268+
rb.insert(7);
269+
rb.insert(30854);
270+
rb.insert(4294967297);
271+
rb.insert(4294967300);
272+
let bools = super::treemap_to_bools(rb);
273+
let mut expected = vec![true; 4294967301];
274+
expected[0] = false;
275+
expected[2] = false;
276+
expected[7] = false;
277+
expected[30854] = false;
278+
expected[4294967297] = false;
279+
expected[4294967300] = false;
280+
assert_eq!(bools, expected);
281+
}
282+
}

0 commit comments

Comments
 (0)